[アップデート] Lambda のイベントソースに Amazon MSK が設定できるようになりました
先日のアップデートで Amazon MSK(Managed Streaming for Apache Kafka) を Lambda のソースイベントとして利用できるようになりました!
アップデートされて直ぐに触ってたのですが、まだ MSK まわりは情報も少なく、うまく検証が進まなかったのですが、本日リリースされていた公式ブログでようやく理解が深まりました。(Thank you, James!!)
何がうれしいのか
Amazon MSK はフルマネージドな Apache Kafka のサービスで、最大のメリットは「完全互換」があることでしょう。現在、オンプレ環境で利用中の Apache Kafka を利用したデータパイプラインを大きな改修することなく AWS に移行することが容易です。
しかし、Lambda などの AWS サービスと連携させるためには Kafka Connector などのフレームワークを EC2 で準備し、コンシューマー側の Sink Connector からトピックを pull し Lambda を呼び出す、といった仕組みが必要でした。
そのため、従来だと AWS サービス連携させるようなユースケースでは、やはり Amazon Kinesis Data Streams が推奨とされてきました。
それが今回のアップデートによって、コンシューマ側の Connector なしに Lambda を呼び出すことができるようになりました。Lambda を起点にその他の AWS サービスとの連携も柔軟にできますので、 Sink Connector を Lambda に置き換えることで可用性は Lambda 基盤に任せられますし、EC2 のコスト負担もなくなるので嬉しいかぎりではないでしょうか。
注意点
MSK ブローカーから Lambda を呼び出す
Amazon MSK はイベントソースマッピングを介して Lambda を呼び出します。そのため、MSK ブローカーが AWS のエンドポイントにアクセスできる経路が必要となります。多くの場合、ブローカーはプライベートサブネットに配置されていると思いますので、NAT ゲートウェイが必要となります。
(引用元:Using Amazon MSK as an event source for AWS Lambda)
また、Lambda も MSK クラスターが配置されいている VPC へのアクセスを必要とするため、VPC Lambda として作成します。
レコードの Key-Value セットは base64 エンコードされる
Amazon MSK レコードの Key-Value セットは base64 でエンコードされていますので、必要に応じてデコードが必要となります。
Received event:{ "eventSource": "aws:kafka", "eventSourceArn": "arn:aws:kafka:us-west-2:012345678901:cluster/Exampgit add lesMSKCluster/e9f754c6-d29a-4430-a7db-958a19fd2c54-4", "records": { "AWSKafkaTopic-0": [ { "topic": "AWSKafkaTopic", "partition": 0, "offset": 0, "timestamp": 1595035749700, "timestampType": "CREATE_TIME", "key": "OGQ1NTk2YjQtMTgxMy00MjM4LWIyNGItNmRhZDhlM2QxYzBj", "value": "OGQ1NTk2YjQtMTgxMy00MjM4LWIyNGItNmRhZDhlM2QxYzBj" } ] } }
サポートされない Kafka の機能
以下の機能は Lambda のイベントソースではサポートされません。
- 認証 : SSL および SASL ベースの認証はサポートされません
- Schema Registry
やってみる
はじめに
今回の検証はバージニアリージョンで実施しています。東京リージョンでも機能はサポートされているのですが、執筆時点においては何故か MSK のトピックにメッセージを投入しても Lambda を呼び出すことが出来ませんでした。
イベントソースマッピングのステータスを確認すると PROBLEM: Connection error, Please check your event source connection configuration.
と表示されます。
ずっと悩んでいたのですが、同樣の構成をバージニアで作ったところ、サクッと確認できましたのでリージョン固有の問題なのかもしれません。(いずれ解決されると思います)
事前準備
以下の環境は事前に作成済みとします。
- VPC
- サブネット
- NAT Gateway
- セキュリティグループ
- MSK クライアント(EC2)
- Amazon MSK クラスター
セキュリティグループ
MSK クラスターとの接続には以下のポートが必要となります。
ポート | 用途 |
---|---|
9092 | ブローカーがプレーンテキストでプロヂューサーおよびコンシューマと通信する |
9094 | ブローカーがTLSでプロヂューサーおよびコンシューマと通信する |
2181 | Apache ZooKeeperノードと通信する |
当セキュリティグループ自身からの通信のみを許可するように設定しておき、MSK クラスター、MSK クライアント、VPC Lambda にアタッチしておくとシンプルにまとめることが出来ます。
MSK クライアント
Amazon MSK はブローカーノードや、ZooKeeper ノードなどの基盤はマネージドサービスとして提供されていますが、Kafka そのものの設定等は AWS API では提供されておらず、従来の Kafka API を利用します。
そのため、Kafka API を発行するためのクライアントが必要となります。チュートリアルを参考にすると容易にセットアップできます。
Amazon MSK クラスターの作成
こちらも公式ガイドのチュートリアル に従い MSK クラスターを作成し、¥AWSKafkaTutorialTopic
トピックまで作成しておきます。
Lambda トリガーの設定
IAM ロールの作成
Amazon MKS 向けの AWS 管理ポリシー として AWSLambdaMSKExecutionRole
が提供されていますので、当該ポリシーをアタッチした IAM ロールを作成します。信頼ポリシーも忘れずに lambda.amazonaws.com
を設定してください。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kafka:DescribeCluster", "kafka:GetBootstrapBrokers", "ec2:CreateNetworkInterface", "ec2:DescribeNetworkInterfaces", "ec2:DescribeVpcs", "ec2:DeleteNetworkInterface", "ec2:DescribeSubnets", "ec2:DescribeSecurityGroups", "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": "*" } ] }
関数の作成
今回は公式ブログよりコードを拝借いたしましたので、Node.js 12.x
で作成しています。コードは以下のとおり。
exports.handler = async (event) => { // Iterate through keys for (let key in event.records) { console.log('Key: ', key) // Iterate through records event.records[key].map((record) => { console.log('Record: ', record) // Decode base64 const msg = Buffer.from(record.value, 'base64').toString() console.log('Message:', msg) }) } }
MSK クラスターの所属する VPC にアクセスする必要があるため、VPC Lambda として作成しています。また、VPC Lambda にアタッチするセキュリティグループは事前に作成しておいた MSK クラスターまわりで共通のものを設定しました。
トリガーの追加(イベントソースマッピング設定)
トリガーの設定を確認すると [MSK] が選択可能になっていますので、これを選択します。事前に作成した MSK クラスターおよびトピック名を指定します。(チュートリアルに従って作成した場合、トピック名は AWSKafkaTutorialTopic
です)。開始位置は 水平トリム(Trim Horizon)
または最新(Latest)
のいずれかを選択し、追加します。
しばらくして以下のように Creating
から 有効
に変わっていれば設定完了です。
コンシューマグループはこのイベントソースマッピング UUID と同じ ID で作成されます。
$ aws lambda list-event-source-mappings --region us-east-1 EventSourceMappings: - BatchSize: 100 EventSourceArn: arn:aws:kafka:us-east-1:xxxxxxxxxxxx:cluster/test-msk/2df3da3b-e692-489c-b076-026dafb88ecd-1 FunctionArn: arn:aws:lambda:us-east-1:xxxxxxxxxxxx:function:msk-test LastModified: '2020-08-15T10:50:37.358000+09:00' LastProcessingResult: No records processed State: Enabled StateTransitionReason: USER_INITIATED Topics: - AWSKafkaTutorialTopic UUID: 870e908b-4d5a-4877-a287-1461d0eaef96
確認
準備ができたので、プロデューサーからメッセージを送信してみます。
$ ./kafka-console-producer.sh \ --broker-list "b-2.demo-cluster.4z02g6.c4.kafka.ap-northeast-1.amazonaws.com:9094,b-1.demo-cluster.4z02g6.c4.kafka.ap-northeast-1.amazonaws.com:9094" \ --producer.config client.properties \ --topic AWSKafkaTutorialTopic >Classmethod!
Lambda のログを確認すると・・・
メッセージを受信できていますね!検証は以上です。
さいごに
これまで Lambda と連携するには、ひと手間が必要だった Amazon MSK ですが、今回のアップデートにより非常に簡単に MSK クラスターから Lambda 連携できるようになったことか確認できました。
Amazon MSK においても後続のサーバレスアーキテクチャと統合しやすくなる、良いアップデートではないでしょうか!
以上!大阪オフィスの丸毛(@marumo1981)でした!